{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "[![AWS SDK for pandas](_static/logo.png \"AWS SDK for pandas\")](https://github.com/aws/aws-sdk-pandas)\n",
    "\n",
    "# 16 - EMR & Docker"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import getpass\n",
    "\n",
    "import boto3\n",
    "\n",
    "import awswrangler as wr"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Enter your bucket name:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "name": "stdin",
     "output_type": "stream",
     "text": [
      " ··········································\n"
     ]
    }
   ],
   "source": [
    "bucket = getpass.getpass()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Enter your Subnet ID:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdin",
     "output_type": "stream",
     "text": [
      " ························\n"
     ]
    }
   ],
   "source": [
    "subnet = getpass.getpass()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "pycharm": {
     "name": "#%% md\n"
    }
   },
   "source": [
    "## Build and Upload Docker Image to ECR repository\n",
    "\n",
    "Replace the `{ACCOUNT_ID}` placeholder."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false,
    "jupyter": {
     "outputs_hidden": false
    },
    "pycharm": {
     "name": "#%%writefile Dockerfile\n"
    }
   },
   "outputs": [],
   "source": [
    "%%writefile Dockerfile\n",
    "\n",
    "FROM amazoncorretto:8\n",
    "\n",
    "RUN yum -y update\n",
    "RUN yum -y install yum-utils\n",
    "RUN yum -y groupinstall development\n",
    "\n",
    "RUN yum list python3*\n",
    "RUN yum -y install python3 python3-dev python3-pip python3-virtualenv\n",
    "\n",
    "RUN python -V\n",
    "RUN python3 -V\n",
    "\n",
    "ENV PYSPARK_DRIVER_PYTHON python3\n",
    "ENV PYSPARK_PYTHON python3\n",
    "\n",
    "RUN pip3 install --upgrade pip\n",
    "RUN pip3 install awswrangler\n",
    "\n",
    "RUN python3 -c \"import awswrangler as wr\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bash\n",
    "\n",
    "docker build -t 'local/emr-wrangler' .\n",
    "aws ecr create-repository --repository-name emr-wrangler\n",
    "docker tag local/emr-wrangler {ACCOUNT_ID}.dkr.ecr.us-east-1.amazonaws.com/emr-wrangler:emr-wrangler\n",
    "eval $(aws ecr get-login --region us-east-1 --no-include-email)\n",
    "docker push {ACCOUNT_ID}.dkr.ecr.us-east-1.amazonaws.com/emr-wrangler:emr-wrangler"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Creating EMR Cluster"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "cluster_id = wr.emr.create_cluster(subnet, docker=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Refresh ECR credentials in the cluster (expiration time: 12h )"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "'s-1B0O45RWJL8CL'"
      ]
     },
     "execution_count": 5,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "wr.emr.submit_ecr_credentials_refresh(cluster_id, path=f\"s3://{bucket}/\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Uploading application script to Amazon S3 (PySpark)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "script = \"\"\"\n",
    "from pyspark.sql import SparkSession\n",
    "spark = SparkSession.builder.appName(\"docker-awswrangler\").getOrCreate()\n",
    "sc = spark.sparkContext\n",
    "\n",
    "print(\"Spark Initialized\")\n",
    "\n",
    "import awswrangler as wr\n",
    "\n",
    "print(f\"awswrangler version: {wr.__version__}\")\n",
    "\"\"\"\n",
    "\n",
    "boto3.client(\"s3\").put_object(Body=script, Bucket=bucket, Key=\"test_docker.py\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Submit PySpark step"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [],
   "source": [
    "DOCKER_IMAGE = f\"{wr.get_account_id()}.dkr.ecr.us-east-1.amazonaws.com/emr-wrangler:emr-wrangler\"\n",
    "\n",
    "step_id = wr.emr.submit_spark_step(cluster_id, f\"s3://{bucket}/test_docker.py\", docker_image=DOCKER_IMAGE)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Wait Step"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "while wr.emr.get_step_state(cluster_id, step_id) != \"COMPLETED\":\n",
    "    pass"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Terminate Cluster"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "wr.emr.terminate_cluster(cluster_id)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Another example with custom configurations"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "cluster_id = wr.emr.create_cluster(\n",
    "    cluster_name=\"my-demo-cluster-v2\",\n",
    "    logging_s3_path=f\"s3://{bucket}/emr-logs/\",\n",
    "    emr_release=\"emr-6.7.0\",\n",
    "    subnet_id=subnet,\n",
    "    emr_ec2_role=\"EMR_EC2_DefaultRole\",\n",
    "    emr_role=\"EMR_DefaultRole\",\n",
    "    instance_type_master=\"m5.2xlarge\",\n",
    "    instance_type_core=\"m5.2xlarge\",\n",
    "    instance_ebs_size_master=50,\n",
    "    instance_ebs_size_core=50,\n",
    "    instance_num_on_demand_master=0,\n",
    "    instance_num_on_demand_core=0,\n",
    "    instance_num_spot_master=1,\n",
    "    instance_num_spot_core=2,\n",
    "    spot_bid_percentage_of_on_demand_master=100,\n",
    "    spot_bid_percentage_of_on_demand_core=100,\n",
    "    spot_provisioning_timeout_master=5,\n",
    "    spot_provisioning_timeout_core=5,\n",
    "    spot_timeout_to_on_demand_master=False,\n",
    "    spot_timeout_to_on_demand_core=False,\n",
    "    python3=True,\n",
    "    docker=True,\n",
    "    spark_glue_catalog=True,\n",
    "    hive_glue_catalog=True,\n",
    "    presto_glue_catalog=True,\n",
    "    debugging=True,\n",
    "    applications=[\"Hadoop\", \"Spark\", \"Hive\", \"Zeppelin\", \"Livy\"],\n",
    "    visible_to_all_users=True,\n",
    "    maximize_resource_allocation=True,\n",
    "    keep_cluster_alive_when_no_steps=True,\n",
    "    termination_protected=False,\n",
    "    spark_pyarrow=True,\n",
    ")\n",
    "\n",
    "wr.emr.submit_ecr_credentials_refresh(cluster_id, path=f\"s3://{bucket}/emr/\")\n",
    "\n",
    "DOCKER_IMAGE = f\"{wr.get_account_id()}.dkr.ecr.us-east-1.amazonaws.com/emr-wrangler:emr-wrangler\"\n",
    "\n",
    "step_id = wr.emr.submit_spark_step(cluster_id, f\"s3://{bucket}/test_docker.py\", docker_image=DOCKER_IMAGE)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "while wr.emr.get_step_state(cluster_id, step_id) != \"COMPLETED\":\n",
    "    pass\n",
    "\n",
    "wr.emr.terminate_cluster(cluster_id)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3.9.14",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.9.14"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
